Streaming & Cancellation
Understanding interruption, priority channels, and graceful shutdown
HPD-Agent provides two distinct mechanisms for stopping agent execution: CancellationToken (hard stop) and Interruption (graceful stop). Understanding when to use each is critical for building responsive UIs.
CancellationToken vs Interruption
There are TWO ways to stop an agent - here's when to use each:
| Method | Use When | What Happens | User Experience |
|---|---|---|---|
| CancellationToken | User hits Ctrl+C, emergency stop, timeout | Throws OperationCanceledException, entire loop stops immediately | Nuclear option - everything stops now |
| Interruption | User clicks "Stop" button in UI | Drops CanInterrupt=true events (text deltas), delivers completion events | Graceful stop - gets MessageTurnFinishedEvent |
When to Use Which
- Ctrl+C in console: CancellationToken (user wants immediate exit)
- Stop button in web UI: Interruption (graceful, clean UI state)
- Timeout: CancellationToken (hard deadline)
- User changes mind mid-stream: Interruption (get completion events)
CancellationToken (Hard Stop)
Console Application Pattern
using var cts = new CancellationTokenSource();
// Hook Ctrl+C
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // Prevent immediate process exit
cts.Cancel(); // Trigger cancellation
};
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
if (evt is IObservabilityEvent) continue;
switch (evt)
{
case TextDeltaEvent delta:
Console.Write(delta.Text);
break;
case MessageTurnFinishedEvent:
Console.WriteLine("\n✓ Done");
break;
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("\n⚠ Cancelled by user");
}Web Application Pattern
// In your ASP.NET endpoint
app.MapPost("/agent/stream", async (
HttpContext context,
MessageRequest request) =>
{
// Use HTTP request cancellation token
var ct = context.RequestAborted;
try
{
await foreach (var evt in agent.RunAsync(request.Messages, cancellationToken: ct))
{
// If client disconnects, ct triggers and stream stops
var json = AgentEventSerializer.ToJson(evt);
await writer.WriteAsync($"data: {json}\n\n");
await writer.FlushAsync(ct);
}
}
catch (OperationCanceledException)
{
// Client disconnected - clean up silently
}
});Timeout Pattern
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
if (evt is IObservabilityEvent) continue;
await HandleEventAsync(evt);
}
}
catch (OperationCanceledException) when (cts.Token.IsCancellationRequested)
{
Console.WriteLine("\n⚠ Operation timed out after 5 minutes");
}Interruption (Graceful Stop)
Note: Stream interruption is an advanced feature. Most applications should use CancellationToken for simplicity. Use interruption when you need graceful completion events in your UI.
Understanding Interruption
Interruption drops interruptible events (like text deltas) while still delivering completion events:
// Events with CanInterrupt = true (dropped on interruption)
- TextDeltaEvent
- ReasoningDeltaEvent
- ToolCallArgsEvent
- Most observability events
// Events with CanInterrupt = false (always delivered)
- MessageTurnFinishedEvent
- TextMessageEndEvent
- ToolCallEndEvent
- Error eventsBasic Interruption Example
// Access the stream registry (requires custom setup)
var streamRegistry = agent.GetStreamRegistry(); // Custom extension
// When user clicks "Stop" button
stopButton.Click += (s, e) =>
{
streamRegistry.InterruptAll();
};
await foreach (var evt in agent.RunAsync(messages))
{
if (evt is IObservabilityEvent) continue;
switch (evt)
{
case TextDeltaEvent delta:
// These get dropped after interruption
Console.Write(delta.Text);
break;
case EventDroppedEvent dropped:
// Observability: track what was dropped
Console.WriteLine($"\n[Dropped {dropped.Count} events]");
break;
case MessageTurnFinishedEvent:
// This ALWAYS arrives, even after interruption
Console.WriteLine("\n✓ Done");
break;
}
}Selective Interruption
Interrupt specific streams:
// Interrupt streams matching a predicate
streamRegistry.InterruptWhere(stream =>
stream.EmittedCount > 1000 || stream.Duration > TimeSpan.FromSeconds(30));
// Interrupt specific stream by ID
var stream = streamRegistry.Get("stream-123");
stream?.Interrupt();Priority-Based Event Routing
For 99% of users: You don't need to set priority manually - the defaults work perfectly. This section explains why your stop button works instantly even during heavy streaming.
How Priority Channels Work
┌─────────────────────────────────────────────────────────────┐
│ BidirectionalEventCoordinator │
│ │
│ Priority Channel (capacity: 64) │
│ ├── Immediate (0): User cancellation, emergency stops │
│ └── Control (1): State changes, interruption acks │
│ │
│ Standard Channel (unbounded) │
│ ├── Normal (2): TextDelta, ToolResult (DEFAULT) │
│ └── Background (3): Metrics, telemetry │
│ │
│ Upstream Channel (capacity: 64) │
│ └── Interruption signals flowing back through middleware │
│ │
│ ReadAllAsync() ← Merges channels in priority order │
└─────────────────────────────────────────────────────────────┘Why Stop Buttons Feel Instant
- Agent emits 1000
TextDeltaEvent(Normal priority) → Standard channel - User clicks "Stop" →
InterruptionRequestEvent(Immediate priority) → Priority channel - Coordinator reads Priority channel FIRST → Interruption processed immediately
- Remaining Normal events in Standard channel are dropped (if
CanInterrupt=true)
Result: Stop button feels instant!
Priority Enum
public enum EventPriority
{
Immediate = 0, // User cancellation, emergency stops
Control = 1, // System control signals, interruption acks
Normal = 2, // Standard data flow (DEFAULT)
Background = 3 // Metrics, telemetry, observability
}Most events use Normal priority by default. The framework automatically assigns Immediate to interruption requests.
Event Stream Properties
All events inherit these streaming properties:
public abstract record AgentEvent
{
// Stream control
public EventPriority Priority { get; init; } = EventPriority.Normal;
public string? StreamId { get; init; }
public bool CanInterrupt { get; init; } = true;
public EventDirection Direction { get; init; } = EventDirection.Downstream;
public long SequenceNumber { get; internal set; }
// Nested agent tracking
public AgentExecutionContext? ExecutionContext { get; init; }
}Event Direction
public enum EventDirection
{
Downstream, // Normal: input → processing → output
Upstream // Control: cancellation signals flowing back
}Upstream events flow through middleware in reverse order, enabling middleware to respond to interruptions.
Stream Management Interfaces
Advanced Feature: These interfaces are for framework developers and advanced scenarios.
IStreamRegistry
public interface IStreamRegistry
{
IStreamHandle Create(string? streamId = null);
IStreamHandle? Get(string streamId);
void InterruptAll();
void InterruptWhere(Func<IStreamHandle, bool> predicate);
IReadOnlyList<IStreamHandle> ActiveStreams { get; }
int ActiveCount { get; }
}IStreamHandle
public interface IStreamHandle
{
string StreamId { get; }
bool IsInterrupted { get; }
bool IsCompleted { get; }
int EmittedCount { get; }
int DroppedCount { get; }
void Interrupt();
void Complete();
Task WaitAsync(CancellationToken ct = default);
event Action<IStreamHandle>? OnInterrupted;
event Action<IStreamHandle>? OnCompleted;
}Best Practices
Use CancellationToken for Most Scenarios
// Simple, reliable, works everywhere
using var cts = new CancellationTokenSource();
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
// ...
}Always Handle OperationCanceledException
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
{
// ...
}
}
catch (OperationCanceledException)
{
// Clean up, show cancellation message
Console.WriteLine("\n⚠ Operation cancelled");
}Combine CancellationToken Sources
// Multiple cancellation sources
using var timeoutCts = new CancellationTokenSource(TimeSpan.FromMinutes(5));
using var userCts = new CancellationTokenSource();
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(
timeoutCts.Token,
userCts.Token);
await foreach (var evt in agent.RunAsync(messages, cancellationToken: combinedCts.Token))
{
// Cancels on EITHER timeout OR user action
}Don't Block the Event Stream
// WRONG: Blocks cancellation
await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
{
Thread.Sleep(1000); // DON'T DO THIS!
}
// CORRECT: Use async operations
await foreach (var evt in agent.RunAsync(messages, cancellationToken: ct))
{
await Task.Delay(1000, ct); // Respects cancellation
}Common Patterns
Pattern 1: Stop Button in UI
private CancellationTokenSource? _cts;
async Task StartAgentAsync()
{
_cts = new CancellationTokenSource();
try
{
await foreach (var evt in agent.RunAsync(messages, cancellationToken: _cts.Token))
{
// Handle events
}
}
catch (OperationCanceledException)
{
// User stopped via button
}
finally
{
_cts?.Dispose();
_cts = null;
}
}
void OnStopButtonClick()
{
_cts?.Cancel();
}Pattern 2: Automatic Timeout
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromMinutes(5));
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
// Automatically stops after 5 minutes
}Pattern 3: Progress-Based Cancellation
using var cts = new CancellationTokenSource();
var eventCount = 0;
await foreach (var evt in agent.RunAsync(messages, cancellationToken: cts.Token))
{
eventCount++;
// Cancel after 1000 events
if (eventCount > 1000)
{
cts.Cancel();
}
}See Also
- Events Overview - Event lifecycle and categories
- Consuming Events - Event handling patterns
- Building Console Apps - Ctrl+C patterns
- Building Web Apps - Stop button implementation